package ua.naiksoftware.stomp.provider;


import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import io.reactivex.subjects.PublishSubject;
import ohos.hiviewdfx.HiLog;
import ohos.hiviewdfx.HiLogLabel;
import ua.naiksoftware.stomp.dto.LifecycleEvent;

import java.util.Collections;

/**
 * Created by forresthopkinsa on 8/8/2017.
 * Created because there was a lot of shared code between JWS and OkHttp connection providers.
 *
 * @since 2021-04-27
 */
public abstract class AbstractConnectionProvider implements ConnectionProvider {
    static final HiLogLabel LABEL = new HiLogLabel(HiLog.LOG_APP, 0x00201, "AbstractConnectionProvider");

    private static final String TAG = AbstractConnectionProvider.class.getSimpleName();

    @NonNull
    private final PublishSubject<LifecycleEvent> lifecycleStream;
    @NonNull
    private final PublishSubject<String> messagesStream;

    /**
     * 构造函数
     */
    public AbstractConnectionProvider() {
        lifecycleStream = PublishSubject.create();
        messagesStream = PublishSubject.create();
    }

    /**
     * messages
     *
     * @return Observable
     */
    @NonNull
    @Override
    public Observable<String> messages() {
        return messagesStream.startWith(initSocket().toObservable());
    }

    /**
     * Simply close socket.
     * <p>
     * For example:
     * <pre>
     * webSocket.close();
     * </pre>
     */
    protected abstract void rawDisconnect();

    /**
     * disconnect
     *
     * @return Completable
     */
    @Override
    public Completable disconnect() {
        return Completable
                .fromAction(this::rawDisconnect);
    }

    private Completable initSocket() {
        return Completable
                .fromAction(this::createWebSocketConnection);
    }

    /**
     * Most important method: connects to websocket and notifies program of messages.
     * <p>
     * See implementations in OkHttpConnectionProvider and WebSocketsConnectionProvider.
     */
    protected abstract void createWebSocketConnection();

    /**
     * send
     *
     * @param stompMessage stompMessage stompMessage
     * @return Completable
     */
    @NonNull
    @Override
    public Completable send(String stompMessage) {
        return Completable.fromCallable(() -> {
            if (getSocket() == null) {
                throw new IllegalStateException("Not connected");
            } else {
                rawSend(stompMessage);
                return Collections.emptyList();
            }
        });
    }

    /**
     * Just a simple message send.
     * <p>
     * For example:
     * <pre>
     * webSocket.send(stompMessage);
     * </pre>
     *
     * @param stompMessage message to send
     */
    protected abstract void rawSend(String stompMessage);

    /**
     * Get socket object.
     * Used for null checking; this object is expected to be null when the connection is not yet established.
     *
     * @return Object
     */
    @Nullable
    protected abstract Object getSocket();

    /**
     * emitLifecycleEvent
     *
     * @param lifecycleEvent
     */
    protected void emitLifecycleEvent(@NonNull LifecycleEvent lifecycleEvent) {
        HiLog.error(LABEL, "Emit lifecycle event: " + lifecycleEvent.getType().name());
        lifecycleStream.onNext(lifecycleEvent);
    }

    /**
     * emitMessage
     *
     * @param stompMessage
     */
    protected void emitMessage(String stompMessage) {
        HiLog.error(LABEL, "Receive STOMP message: " + stompMessage);
        messagesStream.onNext(stompMessage);
    }

    /**
     * lifecycle
     *
     * @return Observable
     */
    @NonNull
    @Override
    public Observable<LifecycleEvent> lifecycle() {
        return lifecycleStream;
    }
}
